package org.budo.warehouse.logic.task;

import java.sql.Timestamp;
import java.util.List;

import javax.annotation.Resource;
import javax.sql.DataSource;

import org.budo.support.dao.page.Page;
import org.budo.support.javax.sql.util.JdbcUtil;
import org.budo.time.Time;
import org.budo.warehouse.service.api.IDataNodeService;
import org.budo.warehouse.service.api.IEntryBufferService;
import org.budo.warehouse.service.api.ServiceDynamicBeanProvider;
import org.budo.warehouse.service.entity.DataNode;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

/**
 * @author lmw
 */
@Slf4j
@Component("budoWarehouseHaHealthCheckTask")
public class BudoWarehouseHaHealthCheckTask {
    @Resource
    private IDataNodeService dataNodeService;

    @Resource
    private ServiceDynamicBeanProvider serviceDynamicBeanProvider;

    @Resource
    private IEntryBufferService entryBufferService;

    /**
     * @see org.budo.warehouse.logic.filter.EventFilterLogicImpl#filterEntry(Pipeline,DataEntry)
     */
    public void warehouseHealthCheck() {
        List<DataNode> sourceDataNodes = dataNodeService.listSourceDataNodes(Page.max());
        for (DataNode dataNode : sourceDataNodes) {
            String url = dataNode.getUrl();
            if (url.startsWith("jdbc:mysql://") //
                    && !url.contains("mysql.rds.aliyuncs.com") // 阿里 rds 带这个,就不要了
                    && !url.contains("rwlb.rds.aliyuncs.com")) { // 阿里PolarDB
                this.createTable(dataNode);
            }

            Timestamp max = entryBufferService.findMaxCreatedAtByDataNodeId(dataNode.getId());
            if (null == max || Time.when(max).isBefore(Time.now().plusMinute(-3))) {
                log.error("#51 max=" + Time.when(max).toString("yyyyMMdd.HHmmss.SSS") + ", dataNode=" + dataNode);
            }
        }

        // 清理数据
        Integer count = entryBufferService.deleteByFlushedAtLessThan(Time.now().plusHour(-12).toTimestamp());
        log.info("#55 已清理 " + count + " 条 EntryBuffer 数据");
    }

    private void createTable(DataNode dataNode) {
        DataSource dataSource = serviceDynamicBeanProvider.dataSource(dataNode);
        String sql = " CREATE TABLE IF NOT EXISTS `mysql`.`ha_health_check` ( `id` int(11) ) ";

        try {
            JdbcUtil.executeUpdate(dataSource, sql);
        } catch (Throwable e) {
            if (("" + e).contains("CREATE command denied to user")) {
                log.error("#66 createTable error for dataNode, e=" + e);
                return;
            }

            throw new RuntimeException("#62 dataNode=" + dataNode.getUrl() + ", " + dataNode.getUsername() + ", e=" + e, e);
        }
    }
}